import glob
import os
import requests
import shutil
import warnings
from random import shuffle
from itertools import product, chain
from pathlib import Path
import contextily
import dask
import dask_geopandas
import dask.bag
import dask.dataframe
import geopandas
import numpy
import pandas
import pygeos
import pyogrio
import xarray, rioxarray
import libpysal
from dask_geopandas.hilbert_distance import _hilbert_distance
from dask.distributed import Client, LocalCluster
from shapely.geometry import box
from shapely.ops import polygonize
from tqdm.auto import tqdm
from sqlalchemy import create_engine
import tools
client = Client(
LocalCluster(n_workers=16, threads_per_worker=1)
)
client
Client
Client-bc3f5227-d2cd-11ec-9450-2f18c6b1b888
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
32b8ed88
| Dashboard: http://127.0.0.1:8787/status | Workers: 16 |
| Total threads: 16 | Total memory: 125.54 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-bf98929e-cb53-4b8b-91ee-4f4a13dee8c6
| Comm: tcp://127.0.0.1:36357 | Workers: 16 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 16 |
| Started: Just now | Total memory: 125.54 GiB |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:46027 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38579/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:37629 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-hwa_n2zz | |
Worker: 1
| Comm: tcp://127.0.0.1:43179 | Total threads: 1 |
| Dashboard: http://127.0.0.1:41611/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:46595 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-vmf1txw0 | |
Worker: 2
| Comm: tcp://127.0.0.1:39211 | Total threads: 1 |
| Dashboard: http://127.0.0.1:45529/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:35869 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-h2ac0os0 | |
Worker: 3
| Comm: tcp://127.0.0.1:33203 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39627/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:44795 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-6f9cmv22 | |
Worker: 4
| Comm: tcp://127.0.0.1:43663 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39495/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:46249 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-0nxedymr | |
Worker: 5
| Comm: tcp://127.0.0.1:38829 | Total threads: 1 |
| Dashboard: http://127.0.0.1:42013/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:40197 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-i9y5e0br | |
Worker: 6
| Comm: tcp://127.0.0.1:37931 | Total threads: 1 |
| Dashboard: http://127.0.0.1:44509/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:43253 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-twh6xxas | |
Worker: 7
| Comm: tcp://127.0.0.1:37855 | Total threads: 1 |
| Dashboard: http://127.0.0.1:40415/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:38629 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-8w8ulh3r | |
Worker: 8
| Comm: tcp://127.0.0.1:44611 | Total threads: 1 |
| Dashboard: http://127.0.0.1:41977/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:37701 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-5110dgtq | |
Worker: 9
| Comm: tcp://127.0.0.1:42563 | Total threads: 1 |
| Dashboard: http://127.0.0.1:34355/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:46045 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-y0x1qg8j | |
Worker: 10
| Comm: tcp://127.0.0.1:44685 | Total threads: 1 |
| Dashboard: http://127.0.0.1:43417/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:39597 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-o_0_yjjz | |
Worker: 11
| Comm: tcp://127.0.0.1:45963 | Total threads: 1 |
| Dashboard: http://127.0.0.1:46035/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:46753 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-r_189adt | |
Worker: 12
| Comm: tcp://127.0.0.1:34133 | Total threads: 1 |
| Dashboard: http://127.0.0.1:45197/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:33811 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-15s4f9i8 | |
Worker: 13
| Comm: tcp://127.0.0.1:35993 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38181/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:38281 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-p3p680a0 | |
Worker: 14
| Comm: tcp://127.0.0.1:42267 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39945/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:35803 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-075xpwyb | |
Worker: 15
| Comm: tcp://127.0.0.1:43685 | Total threads: 1 |
| Dashboard: http://127.0.0.1:40265/status | Memory: 7.85 GiB |
| Nanny: tcp://127.0.0.1:38161 | |
| Local directory: /home/jovyan/work/signature_ai/create_chips/dask-worker-space/worker-rp_3xxwt | |
Define specification.
specs = {
'chip_size': 32,
'bands': [1, 2, 3], #RGB
'mosaic_p': (
'/home/jovyan/work/'
'ghs_composite_s2/GHS-composite-S2.vrt'
),
}
Load chip bounds
chip_bounds = dask_geopandas.read_parquet(f"/home/jovyan/work/chips_gb/chip_bounds_{specs['chip_size']}/").set_crs(27700)
Load signature geometry
%%time
spsig = pyogrio.read_dataframe("../../urbangrammar_samba/spatial_signatures/signatures/signatures_combined_levels_simplified_clip.gpkg")
CPU times: user 833 ms, sys: 375 ms, total: 1.21 s
Wall time: 2 s
Select only chups fully within a signature type
polygons_within = dask_geopandas.sjoin(chip_bounds, spsig[['signature_type', 'geometry']], op="within")
/tmp/ipykernel_54352/793181311.py:1: FutureWarning: The `op` parameter is deprecated and will be removed in a future release. Please use the `predicate` parameter instead. polygons_within = dask_geopandas.sjoin(chip_bounds, spsig[['signature_type', 'geometry']], op="within")
types = ['0_0', '1_0', '3_0', '4_0', '5_0', '6_0', '7_0', '8_0', '2_0', '2_1', '2_2', '9_0', '9_1', '9_2', '9_4', '9_5']
polygons_within = polygons_within[polygons_within['signature_type'].isin(types)] # remove outlier types
Drop excessive number of chips for types with more than 50k chips.
val_counts = polygons_within["signature_type"].value_counts()
oversampled = val_counts[val_counts > 50000]
replacements = []
for t, c in oversampled.iteritems():
mask = polygons_within["signature_type"] == t
single = polygons_within[mask]
sampled = single.sample(frac = 50000 / c / 9)
with_neighbors = dask_geopandas.sjoin(single[["geometry"]], sampled[['signature_type', 'geometry']])
replacements.append(with_neighbors)
# break
polygons_within = polygons_within[~mask]
replacements.append(polygons_within)
balanced_chips = dask.dataframe.multi.concat(replacements)
balanced_chips.to_parquet(f"/home/jovyan/work/chips_gb/chip_bounds_{specs['chip_size']}_balanced/")
Create chips
import numpy as np
import rasterio
def bag_of_chips(chip_bbs, specs, npartitions):
'''
Load imagery for `chip_bbs` using a Dask bag
...
Arguments
---------
chip_bbs : GeoDataFrame
Geo-table with bounding boxes of the chips to load
specs : dict
Metadata dict, including, at least:
- `bands`: band index of each band of interest
- `chip_size`: size of each chip size expressed in pixels
- `mosaic_p`: path to the mosaic/file of imagery
npartitions : int
No. of partitions to split `chip_bbs` before sending to
Dask for distributed computation
Returns
-------
chips : ndarray
Numpy tensor of (N, chip_size, chip_size, n_bands) dimension
with imagery data
'''
# Split chip_bbs
thr = np.linspace(0, chip_bbs.shape[0], npartitions+1, dtype=int)
chunks = [
(chip_bbs.iloc[thr[i]:thr[i+1], :], specs) for i in range(len(thr)-1)
]
# Set up the bag
bag = dask.bag.from_sequence(
chunks, npartitions=npartitions
).map(chip_loader)
# Compute
chips = np.concatenate(bag.compute())
return chips
def chip_loader(pars):
'''
Load imagery for `chip_bbs`
...
Arguments (wrapped in `pars`)
-----------------------------
chip_bbs : GeoDataFrame
Geo-table with bounding boxes of the chips to load
specs : dict
Metadata dict, including, at least:
- `bands`: band index of each band of interest
- `chip_size`: size of each chip size expressed in pixels
- `mosaic_p`: path to the mosaic/file of imagery
Returns
-------
chips : ndarray
Numpy tensor of (N, chip_size, chip_size, n_bands) dimension
with imagery data
'''
chip_bbs, specs = pars
b = len(specs['bands'])
s = specs['chip_size']
chips = np.zeros((chip_bbs.shape[0], b, s, s))
with rasterio.open(specs['mosaic_p']) as src:
for i, tup in enumerate(chip_bbs.itertuples()):
img, transform = rasterio.mask.mask(
src, [tup.geometry], crop=True, all_touched=True
)
img = img[:b, :s, :s]
for ban, (l_min, l_max) in enumerate([(350, 1600), (500, 1600), (600, 1800)]):
img[ban][img[ban] > l_max] = l_max
img[ban][img[ban] < l_min] = l_min
a_std = (img[ban] - l_min) / (l_max - l_min)
img[ban] = a_std * 255
chips[i, :, :, :] = img
chips = np.moveaxis(chips, 1, -1)
return chips.astype(rasterio.uint8)
balanced_chips = dask_geopandas.read_parquet(f"/home/jovyan/work/chips_gb/chip_bounds_{specs['chip_size']}_balanced/")
balanced_chips = balanced_chips.compute().reset_index(drop=True).sample(frac=1, random_state=42)
split = (.4, .1, .4,)
splits = []
for t, c in balanced_chips.signature_type.value_counts().iteritems():
sub = balanced_chips[balanced_chips.signature_type == t]
nn_t, nn_v, ml_t, ml_v = numpy.split(sub, (numpy.cumsum(split) * c).astype(int))
nn_t['split'] = 'nn_train'
nn_v['split'] = 'nn_val'
ml_t['split'] = 'ml_train'
ml_v['split'] = 'ml_val'
splits.append(pandas.concat([nn_t['split'], nn_v['split'], ml_t['split'], ml_v['split']]))
%time splits = pandas.concat(splits).sort_index()
CPU times: user 22.3 ms, sys: 7.14 ms, total: 29.4 ms
Wall time: 16.7 ms
%time balanced_chips = balanced_chips.sort_index()
%time balanced_chips["split"] = splits
CPU times: user 61.9 ms, sys: 0 ns, total: 61.9 ms
Wall time: 46.5 ms
CPU times: user 1.93 ms, sys: 0 ns, total: 1.93 ms
Wall time: 1.56 ms
balanced_chips.drop(columns="index_right").to_parquet(f"../../chips_gb/v2_{specs['chip_size']}_labels.parquet")
/tmp/ipykernel_54352/2143588880.py:1: UserWarning: this is an initial implementation of Parquet/Feather file support and associated metadata. This is tracking version 0.1.0 of the metadata specification at https://github.com/geopandas/geo-arrow-spec
This metadata specification does not yet make stability promises. We do not yet recommend using this in a production setting unless you are able to rewrite your Parquet/Feather files.
To further ignore this warning, you can do:
import warnings; warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')
balanced_chips.drop(columns="index_right").to_parquet(f"../../chips_gb/v2_{specs['chip_size']}_labels.parquet")
chips = bag_of_chips(balanced_chips, specs, npartitions=16)
numpy.save(f"../../chips_gb/v2_{specs['chip_size']}.npy", chips)
bds = geopandas.read_parquet(f"../../chips_gb/v2_32_labels.parquet")
ml_val = balanced_chips[balanced_chips['split'] == 'ml_val']
ml_val.explore("signature_type",prefer_canvas=True, tiles="carto db positron")
Make this Notebook Trusted to load map: File -> Trust Notebook